package conn

import (
	"context"
	"errors"
	"fmt"
	"net"
	"strconv"
	"time"

	"gitee.com/ander888/tools/zlog"
	kfk "github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/scram"
)

type KfkClient struct {
	Producer *kfk.Writer
	Consumer *kfk.Reader
}

// 创建kafka的生产者
func (kfc *KfkClient) NewKafkaProducer(user, pwd, topic, node string) (err error) {
	if topic == "" || node == "" {
		err = errors.New("topic or nodes is empty")
		zlog.ErrWithStr(err).Msg("topic:" + topic + " nodes:" + node)
		return
	}
	sharedTransport := &kfk.Transport{}
	mechanism, err := scram.Mechanism(scram.SHA512, user, pwd)
	sharedTransport.SASL = mechanism
	if err != nil {
		err = errors.New("username or pwd is error")
		zlog.ErrWithStr(err).Msg("user:" + user + " pwd:" + pwd)
		return
	}

	kfc.Producer = &kfk.Writer{
		Addr:         kfk.TCP(node), //TCP函数参数为不定长参数，可以传多个地址组成集群
		Topic:        topic,
		Balancer:     &kfk.Hash{},     // 用于对key进行hash，决定消息发送到哪个分区
		WriteTimeout: 3 * time.Second, // kafka有时候可能负载很高，写不进去，那么超时后可以放弃写入，用于可以丢消息的场景
		RequiredAcks: kfk.RequireNone, // 不需要任何节点确认就返回
		Transport:    sharedTransport,
	}
	if user == "" && pwd == "" {
		kfc.Producer.Transport = nil
		zlog.Info("Do not enable user password verification")
	}

	return
}

// 创建kafka的消费者
func (kfc *KfkClient) NewKafkaConsumer(user, pwd, topic, groupID string, nodes []string) (err error) {
	if topic == "" || len(nodes) == 0 {
		err = errors.New("topic or nodes is empty")
		zlog.ErrWithStr(err).Any(topic, nodes).Msg("")
		return
	}
	dialer := &kfk.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
	}
	mechanism, err := scram.Mechanism(scram.SHA512, user, pwd)
	dialer.SASLMechanism = mechanism
	if err != nil {
		zlog.ErrWithStr(err).Msg("user:" + user + " pwd:" + pwd)
		return
	}
	if user == "" || pwd == "" {
		kfc.Consumer = kfk.NewReader(kfk.ReaderConfig{
			Brokers:        nodes, // broker地址 数组
			GroupID:        groupID,
			GroupTopics:    nil,
			Topic:          topic, // 消费哪个topic
			Partition:      0,
			Dialer:         nil,
			CommitInterval: time.Second,     // offset 上报间隔
			StartOffset:    kfk.FirstOffset, // 仅对新创建的消费者组生效，从头开始消费，工作中可能更常用从最新的开始消费kafka.LastOffset
		})
	} else {
		kfc.Consumer = kfk.NewReader(kfk.ReaderConfig{
			Brokers:        nodes,
			GroupID:        groupID,
			GroupTopics:    nil,
			Topic:          topic,
			Partition:      0,
			Dialer:         dialer,
			CommitInterval: time.Second,
			StartOffset:    kfk.FirstOffset,
		})
	}

	return
}

func (kfc *KfkClient) Write(key, msg []byte) (err error) {
	defer kfc.Producer.Close()
	kMsg := kfk.Message{
		Key:   key,
		Value: msg,
	}
	err = kfc.Producer.WriteMessages(context.Background(), kMsg)

	return
}

func (kfc *KfkClient) Reade() (Key, msg []byte, err error) {
	defer kfc.Consumer.Close()
	msgInfo, err := kfc.Consumer.ReadMessage(context.Background())

	return msgInfo.Key, msgInfo.Value, err
}

func (kfc *KfkClient) TopicList(nodes string) (err error) {
	conn, err := kfk.Dial("tcp", nodes)
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	partitions, err := conn.ReadPartitions()
	if err != nil {
		panic(err.Error())
	}

	m := map[string]struct{}{}

	for _, p := range partitions {
		m[p.Topic] = struct{}{}
	}
	for k := range m {
		fmt.Println(k)
	}
	return
}

func (kfc *KfkClient) TopicCreate(topicName, nodes string) (err error) {
	conn, err := kfk.Dial("tcp", nodes)
	if err != nil {
		return
	}
	defer conn.Close()

	controller, err := conn.Controller()
	if err != nil {
		return
	}
	var controllerConn *kfk.Conn
	controllerConn, err = kfk.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		return
	}
	defer controllerConn.Close()

	topicConfigs := []kfk.TopicConfig{
		{
			Topic:             topicName,
			NumPartitions:     1,
			ReplicationFactor: 1,
		},
	}

	err = controllerConn.CreateTopics(topicConfigs...)
	return
}
